{
 "cells": [
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Data Processing using Pyspark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "#import SparkSession\n",
    "from pyspark.sql import SparkSession"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "#create spar session object\n",
    "spark=SparkSession.builder.appName('data_processing').getOrCreate()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [],
   "source": [
    "# Load csv Dataset \n",
    "df=spark.read.csv('sample_data.csv',inferSchema=True,header=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "['ratings', 'age', 'experience', 'family', 'mobile']"
      ]
     },
     "execution_count": 4,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#columns of dataframe\n",
    "df.columns"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#check number of columns\n",
    "len(df.columns)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "33"
      ]
     },
     "execution_count": 6,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#number of records in dataframe\n",
    "df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "(33, 5)\n"
     ]
    }
   ],
   "source": [
    "#shape of dataset\n",
    "print((df.count(),len(df.columns)))"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 8,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "root\n",
      " |-- ratings: integer (nullable = true)\n",
      " |-- age: integer (nullable = true)\n",
      " |-- experience: double (nullable = true)\n",
      " |-- family: integer (nullable = true)\n",
      " |-- mobile: string (nullable = true)\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#printSchema\n",
    "df.printSchema()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 11,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+\n",
      "|ratings|age|experience|family| mobile|\n",
      "+-------+---+----------+------+-------+\n",
      "|      3| 32|       9.0|     3|   Vivo|\n",
      "|      3| 27|      13.0|     3|  Apple|\n",
      "|      4| 22|       2.5|     0|Samsung|\n",
      "|      4| 37|      16.5|     4|  Apple|\n",
      "|      5| 27|       9.0|     1|     MI|\n",
      "+-------+---+----------+------+-------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#fisrt few rows of dataframe\n",
    "df.show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 12,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-------+\n",
      "|age| mobile|\n",
      "+---+-------+\n",
      "| 32|   Vivo|\n",
      "| 27|  Apple|\n",
      "| 22|Samsung|\n",
      "| 37|  Apple|\n",
      "| 27|     MI|\n",
      "+---+-------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#select only 2 columns\n",
    "df.select('age','mobile').show(5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 13,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+------------------+------------------+------------------+------+\n",
      "|summary|           ratings|               age|        experience|            family|mobile|\n",
      "+-------+------------------+------------------+------------------+------------------+------+\n",
      "|  count|                33|                33|                33|                33|    33|\n",
      "|   mean|3.5757575757575757|30.484848484848484|10.303030303030303|1.8181818181818181|  null|\n",
      "| stddev|1.1188806636071336|  6.18527087180309| 6.770731351213326|1.8448330794164254|  null|\n",
      "|    min|                 1|                22|               2.5|                 0| Apple|\n",
      "|    max|                 5|                42|              23.0|                 5|  Vivo|\n",
      "+-------+------------------+------------------+------------------+------------------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#info about dataframe\n",
    "df.describe().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 43,
   "metadata": {},
   "outputs": [],
   "source": [
    "from pyspark.sql.types import StringType,DoubleType,IntegerType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 16,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+----------------+\n",
      "|ratings|age|experience|family|mobile |age_after_10_yrs|\n",
      "+-------+---+----------+------+-------+----------------+\n",
      "|3      |32 |9.0       |3     |Vivo   |42              |\n",
      "|3      |27 |13.0      |3     |Apple  |37              |\n",
      "|4      |22 |2.5       |0     |Samsung|32              |\n",
      "|4      |37 |16.5      |4     |Apple  |47              |\n",
      "|5      |27 |9.0       |1     |MI     |37              |\n",
      "|4      |27 |9.0       |0     |Oppo   |37              |\n",
      "|5      |37 |23.0      |5     |Vivo   |47              |\n",
      "|5      |37 |23.0      |5     |Samsung|47              |\n",
      "|3      |22 |2.5       |0     |Apple  |32              |\n",
      "|3      |27 |6.0       |0     |MI     |37              |\n",
      "+-------+---+----------+------+-------+----------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#with column\n",
    "df.withColumn(\"age_after_10_yrs\",(df[\"age\"]+10)).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 15,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+----------+\n",
      "|ratings|age|experience|family|mobile |age_double|\n",
      "+-------+---+----------+------+-------+----------+\n",
      "|3      |32 |9.0       |3     |Vivo   |32.0      |\n",
      "|3      |27 |13.0      |3     |Apple  |27.0      |\n",
      "|4      |22 |2.5       |0     |Samsung|22.0      |\n",
      "|4      |37 |16.5      |4     |Apple  |37.0      |\n",
      "|5      |27 |9.0       |1     |MI     |27.0      |\n",
      "|4      |27 |9.0       |0     |Oppo   |27.0      |\n",
      "|5      |37 |23.0      |5     |Vivo   |37.0      |\n",
      "|5      |37 |23.0      |5     |Samsung|37.0      |\n",
      "|3      |22 |2.5       |0     |Apple  |22.0      |\n",
      "|3      |27 |6.0       |0     |MI     |27.0      |\n",
      "+-------+---+----------+------+-------+----------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.withColumn('age_double',df['age'].cast(DoubleType())).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 17,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+----------------+\n",
      "|ratings|age|experience|family|mobile |age_after_10_yrs|\n",
      "+-------+---+----------+------+-------+----------------+\n",
      "|3      |32 |9.0       |3     |Vivo   |42              |\n",
      "|3      |27 |13.0      |3     |Apple  |37              |\n",
      "|4      |22 |2.5       |0     |Samsung|32              |\n",
      "|4      |37 |16.5      |4     |Apple  |47              |\n",
      "|5      |27 |9.0       |1     |MI     |37              |\n",
      "|4      |27 |9.0       |0     |Oppo   |37              |\n",
      "|5      |37 |23.0      |5     |Vivo   |47              |\n",
      "|5      |37 |23.0      |5     |Samsung|47              |\n",
      "|3      |22 |2.5       |0     |Apple  |32              |\n",
      "|3      |27 |6.0       |0     |MI     |37              |\n",
      "+-------+---+----------+------+-------+----------------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#with column\n",
    "df.withColumn(\"age_after_10_yrs\",(df[\"age\"]+10)).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 18,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+------+\n",
      "|ratings|age|experience|family|mobile|\n",
      "+-------+---+----------+------+------+\n",
      "|      3| 32|       9.0|     3|  Vivo|\n",
      "|      5| 37|      23.0|     5|  Vivo|\n",
      "|      4| 37|       6.0|     0|  Vivo|\n",
      "|      5| 37|      13.0|     1|  Vivo|\n",
      "|      4| 37|       6.0|     0|  Vivo|\n",
      "+-------+---+----------+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#filter the records \n",
    "df.filter(df['mobile']=='Vivo').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 20,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+---+-------+------+\n",
      "|age|ratings|mobile|\n",
      "+---+-------+------+\n",
      "| 32|      3|  Vivo|\n",
      "| 37|      5|  Vivo|\n",
      "| 37|      4|  Vivo|\n",
      "| 37|      5|  Vivo|\n",
      "| 37|      4|  Vivo|\n",
      "+---+-------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#filter the records \n",
    "df.filter(df['mobile']=='Vivo').select('age','ratings','mobile').show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 21,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+------+\n",
      "|ratings|age|experience|family|mobile|\n",
      "+-------+---+----------+------+------+\n",
      "|      5| 37|      23.0|     5|  Vivo|\n",
      "|      5| 37|      13.0|     1|  Vivo|\n",
      "+-------+---+----------+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#filter the multiple conditions\n",
    "df.filter(df['mobile']=='Vivo').filter(df['experience'] >10).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 22,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+------+\n",
      "|ratings|age|experience|family|mobile|\n",
      "+-------+---+----------+------+------+\n",
      "|      5| 37|      23.0|     5|  Vivo|\n",
      "|      5| 37|      13.0|     1|  Vivo|\n",
      "+-------+---+----------+------+------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#filter the multiple conditions\n",
    "df.filter((df['mobile']=='Vivo')&(df['experience'] >10)).show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 23,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+\n",
      "| mobile|\n",
      "+-------+\n",
      "|     MI|\n",
      "|   Oppo|\n",
      "|Samsung|\n",
      "|   Vivo|\n",
      "|  Apple|\n",
      "+-------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#Distinct Values in a column\n",
    "df.select('mobile').distinct().show()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 25,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "5"
      ]
     },
     "execution_count": 25,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#distinct value count\n",
    "df.select('mobile').distinct().count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 26,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----+\n",
      "|mobile |count|\n",
      "+-------+-----+\n",
      "|MI     |8    |\n",
      "|Oppo   |7    |\n",
      "|Samsung|6    |\n",
      "|Vivo   |5    |\n",
      "|Apple  |7    |\n",
      "+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy('mobile').count().show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 27,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+-----+\n",
      "|mobile |count|\n",
      "+-------+-----+\n",
      "|MI     |8    |\n",
      "|Oppo   |7    |\n",
      "|Apple  |7    |\n",
      "|Samsung|6    |\n",
      "|Vivo   |5    |\n",
      "+-------+-----+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Value counts\n",
    "df.groupBy('mobile').count().orderBy('count',ascending=False).show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 28,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------------+------------------+------------------+------------------+\n",
      "|mobile |avg(ratings)      |avg(age)          |avg(experience)   |avg(family)       |\n",
      "+-------+------------------+------------------+------------------+------------------+\n",
      "|MI     |3.5               |30.125            |10.1875           |1.375             |\n",
      "|Oppo   |2.857142857142857 |28.428571428571427|10.357142857142858|1.4285714285714286|\n",
      "|Samsung|4.166666666666667 |28.666666666666668|8.666666666666666 |1.8333333333333333|\n",
      "|Vivo   |4.2               |36.0              |11.4              |1.8               |\n",
      "|Apple  |3.4285714285714284|30.571428571428573|11.0              |2.7142857142857144|\n",
      "+-------+------------------+------------------+------------------+------------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Value counts\n",
    "df.groupBy('mobile').mean().show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 29,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------+--------+---------------+-----------+\n",
      "|mobile |sum(ratings)|sum(age)|sum(experience)|sum(family)|\n",
      "+-------+------------+--------+---------------+-----------+\n",
      "|MI     |28          |241     |81.5           |11         |\n",
      "|Oppo   |20          |199     |72.5           |10         |\n",
      "|Samsung|25          |172     |52.0           |11         |\n",
      "|Vivo   |21          |180     |57.0           |9          |\n",
      "|Apple  |24          |214     |77.0           |19         |\n",
      "+-------+------------+--------+---------------+-----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df.groupBy('mobile').sum().show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 30,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------+--------+---------------+-----------+\n",
      "|mobile |max(ratings)|max(age)|max(experience)|max(family)|\n",
      "+-------+------------+--------+---------------+-----------+\n",
      "|MI     |5           |42      |23.0           |5          |\n",
      "|Oppo   |4           |42      |23.0           |2          |\n",
      "|Samsung|5           |37      |23.0           |5          |\n",
      "|Vivo   |5           |37      |23.0           |5          |\n",
      "|Apple  |4           |37      |16.5           |5          |\n",
      "+-------+------------+--------+---------------+-----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Value counts\n",
    "df.groupBy('mobile').max().show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 31,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+------------+--------+---------------+-----------+\n",
      "|mobile |min(ratings)|min(age)|min(experience)|min(family)|\n",
      "+-------+------------+--------+---------------+-----------+\n",
      "|MI     |1           |27      |2.5            |0          |\n",
      "|Oppo   |2           |22      |6.0            |0          |\n",
      "|Samsung|2           |22      |2.5            |0          |\n",
      "|Vivo   |3           |32      |6.0            |0          |\n",
      "|Apple  |3           |22      |2.5            |0          |\n",
      "+-------+------------+--------+---------------+-----------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# Value counts\n",
    "df.groupBy('mobile').min().show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 32,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---------------+\n",
      "|mobile |sum(experience)|\n",
      "+-------+---------------+\n",
      "|MI     |81.5           |\n",
      "|Oppo   |72.5           |\n",
      "|Samsung|52.0           |\n",
      "|Vivo   |57.0           |\n",
      "|Apple  |77.0           |\n",
      "+-------+---------------+\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#Aggregation\n",
    "df.groupBy('mobile').agg({'experience':'sum'}).show(5,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 33,
   "metadata": {},
   "outputs": [],
   "source": [
    "# UDF\n",
    "from pyspark.sql.functions import udf\n"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 34,
   "metadata": {},
   "outputs": [],
   "source": [
    "#normal function \n",
    "def price_range(brand):\n",
    "    if brand in ['Samsung','Apple']:\n",
    "        return 'High Price'\n",
    "    elif brand =='MI':\n",
    "        return 'Mid Price'\n",
    "    else:\n",
    "        return 'Low Price'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 35,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+-----------+\n",
      "|ratings|age|experience|family|mobile |price_range|\n",
      "+-------+---+----------+------+-------+-----------+\n",
      "|3      |32 |9.0       |3     |Vivo   |Low Price  |\n",
      "|3      |27 |13.0      |3     |Apple  |High Price |\n",
      "|4      |22 |2.5       |0     |Samsung|High Price |\n",
      "|4      |37 |16.5      |4     |Apple  |High Price |\n",
      "|5      |27 |9.0       |1     |MI     |Mid Price  |\n",
      "|4      |27 |9.0       |0     |Oppo   |Low Price  |\n",
      "|5      |37 |23.0      |5     |Vivo   |Low Price  |\n",
      "|5      |37 |23.0      |5     |Samsung|High Price |\n",
      "|3      |22 |2.5       |0     |Apple  |High Price |\n",
      "|3      |27 |6.0       |0     |MI     |Mid Price  |\n",
      "+-------+---+----------+------+-------+-----------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#create udf using python function\n",
    "brand_udf=udf(price_range,StringType())\n",
    "#apply udf on dataframe\n",
    "df.withColumn('price_range',brand_udf(df['mobile'])).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 36,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+---------+\n",
      "|ratings|age|experience|family|mobile |age_group|\n",
      "+-------+---+----------+------+-------+---------+\n",
      "|3      |32 |9.0       |3     |Vivo   |senior   |\n",
      "|3      |27 |13.0      |3     |Apple  |young    |\n",
      "|4      |22 |2.5       |0     |Samsung|young    |\n",
      "|4      |37 |16.5      |4     |Apple  |senior   |\n",
      "|5      |27 |9.0       |1     |MI     |young    |\n",
      "|4      |27 |9.0       |0     |Oppo   |young    |\n",
      "|5      |37 |23.0      |5     |Vivo   |senior   |\n",
      "|5      |37 |23.0      |5     |Samsung|senior   |\n",
      "|3      |22 |2.5       |0     |Apple  |young    |\n",
      "|3      |27 |6.0       |0     |MI     |young    |\n",
      "+-------+---+----------+------+-------+---------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#using lambda function\n",
    "age_udf = udf(lambda age: \"young\" if age <= 30 else \"senior\", StringType())\n",
    "#apply udf on dataframe\n",
    "df.withColumn(\"age_group\", age_udf(df.age)).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 52,
   "metadata": {},
   "outputs": [],
   "source": [
    "#pandas udf\n",
    "from pyspark.sql.functions import pandas_udf, PandasUDFType"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 53,
   "metadata": {},
   "outputs": [],
   "source": [
    "#create python function\n",
    "def remaining_yrs(age):\n",
    "    yrs_left=100-age\n",
    "\n",
    "    return yrs_left"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 54,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+--------+\n",
      "|ratings|age|experience|family|mobile |yrs_left|\n",
      "+-------+---+----------+------+-------+--------+\n",
      "|3      |32 |9.0       |3     |Vivo   |68      |\n",
      "|3      |27 |13.0      |3     |Apple  |73      |\n",
      "|4      |22 |2.5       |0     |Samsung|78      |\n",
      "|4      |37 |16.5      |4     |Apple  |63      |\n",
      "|5      |27 |9.0       |1     |MI     |73      |\n",
      "|4      |27 |9.0       |0     |Oppo   |73      |\n",
      "|5      |37 |23.0      |5     |Vivo   |63      |\n",
      "|5      |37 |23.0      |5     |Samsung|63      |\n",
      "|3      |22 |2.5       |0     |Apple  |78      |\n",
      "|3      |27 |6.0       |0     |MI     |73      |\n",
      "+-------+---+----------+------+-------+--------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#create udf using python function\n",
    "length_udf = pandas_udf(remaining_yrs, IntegerType())\n",
    "#apply pandas udf on dataframe\n",
    "df.withColumn(\"yrs_left\", length_udf(df['age'])).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 57,
   "metadata": {},
   "outputs": [],
   "source": [
    "#udf using two columns \n",
    "def prod(rating,exp):\n",
    "    x=rating*exp\n",
    "    return x"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 58,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+-------+-------+\n",
      "|ratings|age|experience|family|mobile |product|\n",
      "+-------+---+----------+------+-------+-------+\n",
      "|3      |32 |9.0       |3     |Vivo   |27.0   |\n",
      "|3      |27 |13.0      |3     |Apple  |39.0   |\n",
      "|4      |22 |2.5       |0     |Samsung|10.0   |\n",
      "|4      |37 |16.5      |4     |Apple  |66.0   |\n",
      "|5      |27 |9.0       |1     |MI     |45.0   |\n",
      "|4      |27 |9.0       |0     |Oppo   |36.0   |\n",
      "|5      |37 |23.0      |5     |Vivo   |115.0  |\n",
      "|5      |37 |23.0      |5     |Samsung|115.0  |\n",
      "|3      |22 |2.5       |0     |Apple  |7.5    |\n",
      "|3      |27 |6.0       |0     |MI     |18.0   |\n",
      "+-------+---+----------+------+-------+-------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "#create udf using python function\n",
    "prod_udf = pandas_udf(prod, DoubleType())\n",
    "#apply pandas udf on multiple columns of dataframe\n",
    "df.withColumn(\"product\", prod_udf(df['ratings'],df['experience'])).show(10,False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 45,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "33"
      ]
     },
     "execution_count": 45,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#duplicate values\n",
    "df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 46,
   "metadata": {},
   "outputs": [],
   "source": [
    "#drop duplicate values\n",
    "df=df.dropDuplicates()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 47,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "26"
      ]
     },
     "execution_count": 47,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "#validate new count\n",
    "df.count()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 59,
   "metadata": {},
   "outputs": [],
   "source": [
    "#drop column of dataframe\n",
    "df_new=df.drop('mobile')"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 60,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-------+---+----------+------+\n",
      "|ratings|age|experience|family|\n",
      "+-------+---+----------+------+\n",
      "|      3| 32|       9.0|     3|\n",
      "|      3| 27|      13.0|     3|\n",
      "|      4| 22|       2.5|     0|\n",
      "|      4| 37|      16.5|     4|\n",
      "|      5| 27|       9.0|     1|\n",
      "|      4| 27|       9.0|     0|\n",
      "|      5| 37|      23.0|     5|\n",
      "|      5| 37|      23.0|     5|\n",
      "|      3| 22|       2.5|     0|\n",
      "|      3| 27|       6.0|     0|\n",
      "+-------+---+----------+------+\n",
      "only showing top 10 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "df_new.show(10)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# saving file (csv)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#current working directory\n",
    "pwd"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#target directory \n",
    "write_uri='/home/jovyan/work/df_csv'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#save the dataframe as single csv \n",
    "df.coalesce(1).write.format(\"csv\").option(\"header\",\"true\").save(write_uri)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# parquet"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#target location\n",
    "parquet_uri='/home/jovyan/work/df_parquet'"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "#save the data into parquet format \n",
    "df.write.format('parquet').save(parquet_uri)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.3"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
